﻿using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Nito.AsyncEx;
using CK.Sprite.RabbitMQ.Core;

namespace CK.Sprite.RabbitMQ.Service
{
    public class CommonQueueManager : ICommonQueueManager
    {
        protected ConcurrentDictionary<string, IRunnable> JobQueues { get; }

        protected IServiceProvider ServiceProvider { get; }

        protected CommonRabbitMqOptions Options { get; }

        protected SemaphoreSlim SyncSemaphore { get; }

        public CommonQueueManager(
            IOptions<CommonRabbitMqOptions> options,
            IServiceProvider serviceProvider)
        {
            ServiceProvider = serviceProvider;
            JobQueues = new ConcurrentDictionary<string, IRunnable>();
            SyncSemaphore = new SemaphoreSlim(1, 1);
            Options = options.Value;
        }

        public async Task StartAsync(CancellationToken cancellationToken = default)
        {
            foreach (var queueType in Options.CommonQueueTypes)
            {
                var jobQueue = (IRunnable)ServiceProvider.GetRequiredService(queueType.Value);
                await jobQueue.StartAsync(cancellationToken);
                JobQueues[queueType.Key] = jobQueue;
            }
        }

        public async Task StopAsync(CancellationToken cancellationToken = default)
        {
            foreach (var jobQueue in JobQueues.Values)
            {
                await jobQueue.StopAsync(cancellationToken);
            }

            JobQueues.Clear();
        }

        public async Task<ICommonQueue> GetAsync(string queueName)
        {
            if(!Options.CommonQueueConfigs.ContainsKey(queueName))
            {
                throw new ArgumentException("未找到消息队列信息");
            }
            var jobConfiguration = Options.CommonQueueConfigs[queueName];

            if (JobQueues.TryGetValue(queueName, out var jobQueue))
            {
                return (ICommonQueue)jobQueue;
            }

            using (await SyncSemaphore.LockAsync())
            {
                if (JobQueues.TryGetValue(queueName, out jobQueue))
                {
                    return (ICommonQueue)jobQueue;
                }

                jobQueue = (ICommonQueue)ServiceProvider.GetRequiredService(Options.CommonQueueTypes[queueName]);

                await jobQueue.StartAsync();

                JobQueues.TryAdd(queueName, jobQueue);

                return (ICommonQueue)jobQueue;
            }
        }
    }
}
